
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class FeatureParseUDTF extends GenericUDTF
{

    private PrimitiveObjectInspector stringOI = null;

    @Override
    public StructObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException
    {

        // 异常检测
        if (objectInspectors.length != 1) {
            throw new UDFArgumentException("NameParserGenericUDTF() takes exactly one argument");
        }

        if(objectInspectors[0].getCategory()!=ObjectInspector.Category.PRIMITIVE&&((PrimitiveObjectInspector) objectInspectors[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            throw new UDFArgumentException("NameParserGenericUDTF() takes a string as a parameter");
        }

        //输入
        stringOI = (PrimitiveObjectInspector) objectInspectors[0];

        // 输出
        List<String> fieldNames = new ArrayList<String>(2);
        List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);

        // 输出列名
        fieldNames.add("name");
        fieldNames.add("value");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }


    @Override
    public void process(Object[] record) throws HiveException {

        final String feature = stringOI.getPrimitiveJavaObject(record[0]).toString();
        ArrayList<Object[]> results = parseInputRecord(feature);
        Iterator<Object[]> it = results.iterator();
        while (it.hasNext()){
            Object[] r= it.next();
            forward(r);
        }
    }
    /**
     * 解析函数，将json格式字符格式化成多行数据
     * @param feature
     * @return
     */
    public ArrayList<Object[]> parseInputRecord(String feature){
        ArrayList<Object[]> resultList = null;
        try {
            JSONObject json = JSON.parseObject(feature);
            resultList = new ArrayList<Object[]>();
            for (String nameSpace : json.keySet())
            {
                JSONObject dimensionJson = json.getJSONObject(nameSpace);
                for (String dimensionName : dimensionJson.keySet())
                {
                    JSONObject featureJson = dimensionJson.getJSONObject(dimensionName);
                    for (String featureName : featureJson.keySet())
                    {
                        String property_name = nameSpace + ":" + dimensionName + ":" + featureName;
                        Object[] item = new Object[2];
                        item[0] = property_name;
                        item[1] = featureJson.get(featureName);
                        resultList.add(item);
                    }
                }
            }
        } catch (Exception e)
        {
            e.printStackTrace();
        }
        return resultList;
    }


    @Override
    public void close() throws HiveException
    {

    }
}
//这个已经是三年前的例子了.
//https://www.jianshu.com/p/ac352ceab9cd